In [ ]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os
import sys
from numpy import nan 
%matplotlib inline
import seaborn as sns
sns.set_context('notebook',font_scale=1.25)
from IPython.display import display, HTML
import scipy.stats
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession 
from pyspark.ml  import Pipeline     
from pyspark.sql import SQLContext  
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

Reading all the data file path i.e CSV path¶

In [ ]:
import os
import glob

path  = os.path.abspath(os.getcwd()) + "\\DataSetAdventureWorksCSV"

extension = 'csv'
os.chdir(path)
path_csv = glob.glob('*.{}'.format(extension))

INITIALISING SPARK¶

In [ ]:
import findspark
findspark.init()


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigData').getOrCreate()

# ** Creating a DWH class to create mapping of path of CSV with thier identifier
class DWH:
    csvdict = {}
    for file in path_csv:
        csvdict[file[:-4]] = file
DWH().csvdict
Out[ ]:
{'DimAccount': 'DimAccount.csv',
 'DimCurrency': 'DimCurrency.csv',
 'DimCustomer': 'DimCustomer.csv',
 'DimDate': 'DimDate.csv',
 'DimDepartmentGroup': 'DimDepartmentGroup.csv',
 'DimGeography': 'DimGeography.csv',
 'DimOrganization': 'DimOrganization.csv',
 'DimProduct': 'DimProduct.csv',
 'DimProductCategory': 'DimProductCategory.csv',
 'DimProductSubcategory': 'DimProductSubcategory.csv',
 'DimPromotion': 'DimPromotion.csv',
 'DimReseller': 'DimReseller.csv',
 'DimSalesReason': 'DimSalesReason.csv',
 'DimSalesTerritory': 'DimSalesTerritory.csv',
 'DimScenario': 'DimScenario.csv',
 'FactCallCenter': 'FactCallCenter.csv',
 'FactCurrencyRate': 'FactCurrencyRate.csv',
 'FactFinance': 'FactFinance.csv',
 'FactInternetSales': 'FactInternetSales.csv',
 'FactSalesTargets': 'FactSalesTargets.csv'}
In [ ]:
csvdf  = pd.DataFrame(DWH.csvdict.items(), columns=['key', 'path'])
csvdf
Out[ ]:
key path
0 DimAccount DimAccount.csv
1 DimCurrency DimCurrency.csv
2 DimCustomer DimCustomer.csv
3 DimDate DimDate.csv
4 DimDepartmentGroup DimDepartmentGroup.csv
5 DimGeography DimGeography.csv
6 DimOrganization DimOrganization.csv
7 DimProduct DimProduct.csv
8 DimProductCategory DimProductCategory.csv
9 DimProductSubcategory DimProductSubcategory.csv
10 DimPromotion DimPromotion.csv
11 DimReseller DimReseller.csv
12 DimSalesReason DimSalesReason.csv
13 DimSalesTerritory DimSalesTerritory.csv
14 DimScenario DimScenario.csv
15 FactCallCenter FactCallCenter.csv
16 FactCurrencyRate FactCurrencyRate.csv
17 FactFinance FactFinance.csv
18 FactInternetSales FactInternetSales.csv
19 FactSalesTargets FactSalesTargets.csv

Creating PySpark DataFrame from CSV file¶

In [ ]:
FactInternetSales = spark.read.option('header','true').csv(DWH.csvdict['FactInternetSales'])
DimCustomer = spark.read.option('header','true').csv(DWH.csvdict['DimCustomer'])
DimProduct = spark.read.option('header','true').csv(DWH.csvdict['DimProduct'])
DimProductSubcategory = spark.read.option('header','true').csv(DWH.csvdict['DimProductSubcategory'])
DimProductCategory = spark.read.option('header','true').csv(DWH.csvdict['DimProductCategory'])
DimSalesTerritory = spark.read.option('header','true').csv(DWH.csvdict['DimSalesTerritory'])
DimGeography = spark.read.option('header','true').csv(DWH.csvdict['DimGeography'])

Top 5 Rows Using Pyspark¶

In [ ]:
FactInternetSales.show(5)
+----------+------------+----------+-----------+-----------+------------+-----------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+---------------------+----------------+--------------------+--------------------+--------------------+
|ProductKey|OrderDateKey|DueDateKey|ShipDateKey|CustomerKey|PromotionKey|CurrencyKey|SalesTerritoryKey|SalesOrderNumber|SalesOrderLineNumber|RevisionNumber|OrderQuantity|UnitPrice|ExtendedAmount|UnitPriceDiscountPct|DiscountAmount|ProductStandardCost|TotalProductCost|SalesAmount|  TaxAmt|Freight|CarrierTrackingNumber|CustomerPONumber|           OrderDate|             DueDate|            ShipDate|
+----------+------------+----------+-----------+-----------+------------+-----------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+---------------------+----------------+--------------------+--------------------+--------------------+
|       310|    20101229|  20110110|   20110105|      21768|           1|         19|                6|         SO43697|                   1|             1|            1|  3578.27|       3578.27|                   0|             0|          2171.2942|       2171.2942|    3578.27|286.2616|89.4568|                 NULL|            NULL|2010-12-29 00:00:...|2011-01-10 00:00:...|2011-01-05 00:00:...|
|       346|    20101229|  20110110|   20110105|      28389|           1|         39|                7|         SO43698|                   1|             1|            1|  3399.99|       3399.99|                   0|             0|          1912.1544|       1912.1544|    3399.99|271.9992|84.9998|                 NULL|            NULL|2010-12-29 00:00:...|2011-01-10 00:00:...|2011-01-05 00:00:...|
|       346|    20101229|  20110110|   20110105|      25863|           1|        100|                1|         SO43699|                   1|             1|            1|  3399.99|       3399.99|                   0|             0|          1912.1544|       1912.1544|    3399.99|271.9992|84.9998|                 NULL|            NULL|2010-12-29 00:00:...|2011-01-10 00:00:...|2011-01-05 00:00:...|
|       336|    20101229|  20110110|   20110105|      14501|           1|        100|                4|         SO43700|                   1|             1|            1| 699.0982|      699.0982|                   0|             0|           413.1463|        413.1463|   699.0982| 55.9279|17.4775|                 NULL|            NULL|2010-12-29 00:00:...|2011-01-10 00:00:...|2011-01-05 00:00:...|
|       346|    20101229|  20110110|   20110105|      11003|           1|          6|                9|         SO43701|                   1|             1|            1|  3399.99|       3399.99|                   0|             0|          1912.1544|       1912.1544|    3399.99|271.9992|84.9998|                 NULL|            NULL|2010-12-29 00:00:...|2011-01-10 00:00:...|2011-01-05 00:00:...|
+----------+------------+----------+-----------+-----------+------------+-----------+-----------------+----------------+--------------------+--------------+-------------+---------+--------------+--------------------+--------------+-------------------+----------------+-----------+--------+-------+---------------------+----------------+--------------------+--------------------+--------------------+
only showing top 5 rows

Top 5 Rows Using Pandas¶

In [ ]:
FactInternetSales.toPandas().head()
Out[ ]:
ProductKey OrderDateKey DueDateKey ShipDateKey CustomerKey PromotionKey CurrencyKey SalesTerritoryKey SalesOrderNumber SalesOrderLineNumber ... ProductStandardCost TotalProductCost SalesAmount TaxAmt Freight CarrierTrackingNumber CustomerPONumber OrderDate DueDate ShipDate
0 310 20101229 20110110 20110105 21768 1 19 6 SO43697 1 ... 2171.2942 2171.2942 3578.27 286.2616 89.4568 NULL NULL 2010-12-29 00:00:00.000 2011-01-10 00:00:00.000 2011-01-05 00:00:00.000
1 346 20101229 20110110 20110105 28389 1 39 7 SO43698 1 ... 1912.1544 1912.1544 3399.99 271.9992 84.9998 NULL NULL 2010-12-29 00:00:00.000 2011-01-10 00:00:00.000 2011-01-05 00:00:00.000
2 346 20101229 20110110 20110105 25863 1 100 1 SO43699 1 ... 1912.1544 1912.1544 3399.99 271.9992 84.9998 NULL NULL 2010-12-29 00:00:00.000 2011-01-10 00:00:00.000 2011-01-05 00:00:00.000
3 336 20101229 20110110 20110105 14501 1 100 4 SO43700 1 ... 413.1463 413.1463 699.0982 55.9279 17.4775 NULL NULL 2010-12-29 00:00:00.000 2011-01-10 00:00:00.000 2011-01-05 00:00:00.000
4 346 20101229 20110110 20110105 11003 1 6 9 SO43701 1 ... 1912.1544 1912.1544 3399.99 271.9992 84.9998 NULL NULL 2010-12-29 00:00:00.000 2011-01-10 00:00:00.000 2011-01-05 00:00:00.000

5 rows × 26 columns

In [ ]:
FactInternetSales.printSchema() ##To help me understand the data-type of my sales data
root
 |-- ProductKey: string (nullable = true)
 |-- OrderDateKey: string (nullable = true)
 |-- DueDateKey: string (nullable = true)
 |-- ShipDateKey: string (nullable = true)
 |-- CustomerKey: string (nullable = true)
 |-- PromotionKey: string (nullable = true)
 |-- CurrencyKey: string (nullable = true)
 |-- SalesTerritoryKey: string (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- SalesOrderLineNumber: string (nullable = true)
 |-- RevisionNumber: string (nullable = true)
 |-- OrderQuantity: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- ExtendedAmount: string (nullable = true)
 |-- UnitPriceDiscountPct: string (nullable = true)
 |-- DiscountAmount: string (nullable = true)
 |-- ProductStandardCost: string (nullable = true)
 |-- TotalProductCost: string (nullable = true)
 |-- SalesAmount: string (nullable = true)
 |-- TaxAmt: string (nullable = true)
 |-- Freight: string (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- CustomerPONumber: string (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- DueDate: string (nullable = true)
 |-- ShipDate: string (nullable = true)

REGISTER TABLES¶

In [ ]:
FactInternetSales.createOrReplaceTempView('FactInternetSalesVW')
DimCustomer.createOrReplaceTempView('DimCustomerVW')
DimProduct.createOrReplaceTempView('DimProductVW')
DimProductSubcategory.createOrReplaceTempView('DimProductSubcategoryVW')
DimProductCategory.createOrReplaceTempView('DimProductCategoryVW')
DimSalesTerritory.createOrReplaceTempView('DimSalesTerritoryVW')
DimGeography.createOrReplaceTempView('DimGeographyVW')

TASK1 : EDA WITH SPARK SQL

In [ ]:
d = dict( 
    FactInternetSalesVW = FactInternetSales.columns,
    DimCustomerVW = DimCustomer.columns,
    DimProductVW = DimProduct.columns,
    DimProductSubcategoryVW = DimProductSubcategory.columns,
    DimProductCategoryVW = DimProductCategory.columns,
    DimSalesTerritoryVW = DimSalesTerritory.columns,
    DimGeographyVW = DimGeography.columns
    
)   
pd.DataFrame(dict([ (k,pd.Series(v)) for k,v in d.items() ]))
Out[ ]:
FactInternetSalesVW DimCustomerVW DimProductVW DimProductSubcategoryVW DimProductCategoryVW DimSalesTerritoryVW DimGeographyVW
0 ProductKey CustomerKey ProductKey ProductSubcategoryKey ProductCategoryKey SalesTerritoryKey GeographyKey
1 OrderDateKey GeographyKey ProductAlternateKey ProductSubcategoryAlternateKey ProductCategoryAlternateKey SalesTerritoryAlternateKey City
2 DueDateKey CustomerAlternateKey ProductSubcategoryKey EnglishProductSubcategoryName EnglishProductCategoryName SalesTerritoryRegion StateProvinceCode
3 ShipDateKey Title WeightUnitMeasureCode ProductCategoryKey SpanishProductCategoryName SalesTerritoryCountry StateProvinceName
4 CustomerKey FirstName SizeUnitMeasureCode NaN FrenchProductCategoryName SalesTerritoryGroup CountryRegionCode
5 PromotionKey MiddleName EnglishProductName NaN NaN NaN EnglishCountryRegionName
6 CurrencyKey LastName SpanishProductName NaN NaN NaN SpanishCountryRegionName
7 SalesTerritoryKey NameStyle FrenchProductName NaN NaN NaN FrenchCountryRegionName
8 SalesOrderNumber BirthDate StandardCost NaN NaN NaN PostalCode
9 SalesOrderLineNumber MaritalStatus FinishedGoodsFlag NaN NaN NaN SalesTerritoryKey
10 RevisionNumber Suffix Color NaN NaN NaN IpAddressLocator
11 OrderQuantity Gender SafetyStockLevel NaN NaN NaN NaN
12 UnitPrice EmailAddress ReorderPoint NaN NaN NaN NaN
13 ExtendedAmount YearlyIncome ListPrice NaN NaN NaN NaN
14 UnitPriceDiscountPct TotalChildren Size NaN NaN NaN NaN
15 DiscountAmount NumberChildrenAtHome SizeRange NaN NaN NaN NaN
16 ProductStandardCost EnglishEducation Weight NaN NaN NaN NaN
17 TotalProductCost SpanishEducation DaysToManufacture NaN NaN NaN NaN
18 SalesAmount FrenchEducation ProductLine NaN NaN NaN NaN
19 TaxAmt EnglishOccupation DealerPrice NaN NaN NaN NaN
20 Freight SpanishOccupation Class NaN NaN NaN NaN
21 CarrierTrackingNumber FrenchOccupation Style NaN NaN NaN NaN
22 CustomerPONumber HouseOwnerFlag ModelName NaN NaN NaN NaN
23 OrderDate NumberCarsOwned EnglishDescription NaN NaN NaN NaN
24 DueDate AddressLine1 StartDate NaN NaN NaN NaN
25 ShipDate AddressLine2 EndDate NaN NaN NaN NaN
26 NaN Phone Status NaN NaN NaN NaN
27 NaN DateFirstPurchase NaN NaN NaN NaN NaN
28 NaN CommuteDistance NaN NaN NaN NaN NaN

5NF Star Schema:¶

In [ ]:
from IPython.display import Image
from IPython.core.display import HTML 
Image(url= "https://editor.analyticsvidhya.com/uploads/381435NF%20Star%20Schema.png")
Out[ ]:
In [ ]:
# simple read data using Joins

MyOrders = spark.sql('''

select
  Orders.SalesOrderNumber,
  Pro.EnglishProductName as ProductName,
  ProSubCat.EnglishProductSubcategoryName as ProductSubcategoryName,
  ProCat.EnglishProductCategoryName as ProductcategoryName,
  INT(Orders.OrderQuantity) as OrderQuantity,
  ROUND(DOUBLE(Orders.SalesAmount),2) as SalesAmount,
  ROUND((Orders.SalesAmount -  Orders.TotalProductCost),2) as Margin,
  DATE(Orders.OrderDate) as OrderDate,
  date_format(Orders.OrderDate,'MMM') as MonthName,
  CONCAT('Q',Quarter(Orders.OrderDate)) as Quarter,
  weekofyear(Orders.OrderDate) as WeekNo,
  YEAR(Orders.OrderDate) as OrderYear,
  CONCAT(YEAR(Orders.OrderDate),'-',CONCAT('Q',Quarter(Orders.OrderDate))) as YYQQ,
  INT(Orders.CustomerKey) as CustKey,
  CONCAT(cust.FirstName,' ',cust.LastName) as CustomerName,
  cust.Gender as Gender,
  Date(cust.Birthdate) as BirthDate,
  YEAR(current_date()) - YEAR(Date(cust.Birthdate)) as AGE,
  ROUND(INT(cust.YearlyIncome)) as YearlyIncome,
  CASE
      WHEN ROUND(INT(cust.YearlyIncome)) <= 40000 THEN 'LOW INCOME'
      WHEN ROUND(INT(cust.YearlyIncome)) <= 60000 THEN 'MODERATE INCOME'
      ELSE 'HIGH INCOME'
  END AS IncomeCategory,
  Geo.City as CustomerCity,
  SalesTerrority.SalesTerritoryCountry as SalesCountry,
  SalesTerrority.SalesTerritoryGroup as SalesRegion
  
from FactInternetSalesVW Orders
LEFT JOIN DimCustomerVW Cust
ON (Orders.CustomerKey = Cust.CustomerKey)
LEFT JOIN DimProductVW Pro
ON (Orders.ProductKey = Pro.ProductKey)
LEFT JOIN DimProductSubcategoryVW ProSubCat
ON (Pro.ProductSubcategoryKey = ProSubCat.ProductSubcategoryKey)
LEFT JOIN DimProductCategoryVW ProCat
ON (ProCat.ProductCategoryKey = ProSubCat.ProductCategoryKey)
LEFT JOIN DimSalesTerritoryVW SalesTerrority
ON (Orders.SalesTerritoryKey = SalesTerrority.SalesTerritoryKey)
LEFT JOIN DimGeographyVW Geo
ON (Cust.GeographyKey = Geo.GeographyKey)
Order By OrderDate Asc

''')


MyOrders.createOrReplaceTempView('MyOrdersView')
SalesDataDF = MyOrders.toPandas() ## we will use MyorderDF for further analysis
pd.set_option('display.max_columns', None)
print(SalesDataDF.shape)
SalesDataDF.head()
(60398, 23)
Out[ ]:
SalesOrderNumber ProductName ProductSubcategoryName ProductcategoryName OrderQuantity SalesAmount Margin OrderDate MonthName Quarter WeekNo OrderYear YYQQ CustKey CustomerName Gender BirthDate AGE YearlyIncome IncomeCategory CustomerCity SalesCountry SalesRegion
0 SO43697 Road-150 Red, 62 Road Bikes Bikes 1 3578.27 1406.98 2010-12-29 Dec Q4 52 2010 2010-Q4 21768 Cole Watson M 1946-08-22 77 70000 HIGH INCOME Metchosin Canada North America
1 SO43698 Mountain-100 Silver, 44 Mountain Bikes Bikes 1 3399.99 1487.84 2010-12-29 Dec Q4 52 2010 2010-Q4 28389 Rachael Martinez F 1964-12-18 59 20000 LOW INCOME Pantin France Europe
2 SO43699 Mountain-100 Silver, 44 Mountain Bikes Bikes 1 3399.99 1487.84 2010-12-29 Dec Q4 52 2010 2010-Q4 25863 Sydney Wright F 1946-12-03 77 40000 LOW INCOME Lebanon United States North America
3 SO43700 Road-650 Black, 62 Road Bikes Bikes 1 699.10 285.95 2010-12-29 Dec Q4 52 2010 2010-Q4 14501 Ruben Prasad M 1938-05-13 85 80000 HIGH INCOME Beverly Hills United States North America
4 SO43701 Mountain-100 Silver, 44 Mountain Bikes Bikes 1 3399.99 1487.84 2010-12-29 Dec Q4 52 2010 2010-Q4 11003 Christy Zhu F 1968-02-15 55 70000 HIGH INCOME North Ryde Australia Pacific
In [ ]:
## To check Null Values in our Final Data
from pyspark.sql.functions import isnan, when, count, col
MyOrders.select([count(when( col(c).isNull(), c)).alias(c) for c in MyOrders.columns]).toPandas()
Out[ ]:
SalesOrderNumber ProductName ProductSubcategoryName ProductcategoryName OrderQuantity SalesAmount Margin OrderDate MonthName Quarter WeekNo OrderYear YYQQ CustKey CustomerName Gender BirthDate AGE YearlyIncome IncomeCategory CustomerCity SalesCountry SalesRegion
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
In [ ]:
SalesDataDF[['SalesAmount','Margin','YearlyIncome']].describe()
Out[ ]:
SalesAmount Margin YearlyIncome
count 60398.000000 60398.000000 60398.000000
mean 486.086922 200.020197 59715.056790
std 928.489878 378.960731 33065.426837
min 2.290000 1.430000 10000.000000
25% 7.950000 3.120000 30000.000000
50% 29.990000 15.640000 60000.000000
75% 539.990000 196.340000 80000.000000
max 3578.270000 1487.840000 170000.000000
In [ ]:
# Step 1) Checking the Rows With Nulls
SalesDataDF[SalesDataDF.isna().any(axis=1)]
Out[ ]:
SalesOrderNumber ProductName ProductSubcategoryName ProductcategoryName OrderQuantity SalesAmount Margin OrderDate MonthName Quarter WeekNo OrderYear YYQQ CustKey CustomerName Gender BirthDate AGE YearlyIncome IncomeCategory SalesCountry SalesRegion
In [ ]:
# checking for NaN instances
no_of_nan_values = SalesDataDF.isna().sum().sum()
display(HTML(f"<h3>  <b style='color:#800000;font-size:22px;'>Inference of MyOrders Fact Dataframe </b>:<ul><li>There are {no_of_nan_values} nan values.</li></ul></h3>"))

Inference of MyOrders Fact Dataframe :
  • There are 0 nan values.

In [ ]:
SalesDataDF.columns
Out[ ]:
Index(['SalesOrderNumber', 'ProductName', 'ProductSubcategoryName',
       'ProductcategoryName', 'OrderQuantity', 'SalesAmount', 'Margin',
       'OrderDate', 'MonthName', 'Quarter', 'WeekNo', 'OrderYear', 'YYQQ',
       'CustKey', 'CustomerName', 'Gender', 'BirthDate', 'AGE', 'YearlyIncome',
       'IncomeCategory', 'SalesCountry', 'SalesRegion'],
      dtype='object')
In [ ]:
MyOrders
Out[ ]:
DataFrame[SalesOrderNumber: string, ProductName: string, ProductSubcategoryName: string, ProductcategoryName: string, OrderQuantity: int, SalesAmount: double, Margin: double, OrderDate: date, MonthName: string, Quarter: string, WeekNo: int, OrderYear: int, YYQQ: string, CustKey: int, CustomerName: string, Gender: string, BirthDate: date, AGE: int, YearlyIncome: int, IncomeCategory: string, SalesCountry: string, SalesRegion: string]
In [ ]:
from pandas_profiling import ProfileReport
Salesprofile = ProfileReport(SalesDataDF)
# Salesprofile

Query for getting Total Sales By Country¶

In [ ]:
# group and get stats

salesbyqtr = spark.sql('''
select
   SalesCountry,
   YYQQ,
   OrderYear,
   count(SalesOrderNumber) as Total_sales,
   CONCAT('$ ', ROUND(SUM(SalesAmount) / 1000000,1), 'M') as TotalRevenueMillion
from MyOrdersView 
group by SalesCountry,OrderYear,YYQQ
ORDER BY YYQQ
''')

# to pandas dataframe
df_salesbyqtr = salesbyqtr.toPandas()
df_salesbyqtr.head(n=10)
Out[ ]:
SalesCountry YYQQ OrderYear Total_sales TotalRevenueMillion
0 France 2010-Q4 2010 1 $ 0.0M
1 United States 2010-Q4 2010 5 $ 0.0M
2 Australia 2010-Q4 2010 6 $ 0.0M
3 Canada 2010-Q4 2010 1 $ 0.0M
4 United Kingdom 2010-Q4 2010 1 $ 0.0M
5 Canada 2011-Q1 2011 20 $ 0.1M
6 Australia 2011-Q1 2011 177 $ 0.6M
7 France 2011-Q1 2011 30 $ 0.1M
8 United Kingdom 2011-Q1 2011 51 $ 0.2M
9 United States 2011-Q1 2011 130 $ 0.4M
In [ ]:
import plotly.express as px

import plotly.io as pio
pio.renderers.default ='notebook'

fig = px.line(df_salesbyqtr[(df_salesbyqtr['OrderYear'] >= 2012)], x="YYQQ", y="TotalRevenueMillion", color='SalesCountry')
fig.show()
In [ ]:
# group and get stats
countrygrouped = spark.sql('''
select
   SalesCountry
  ,count(SalesOrderNumber) as total_sales
from MyOrdersView 
group by SalesCountry
''')

# to pandas dataframe
df_TotalSalesByCountry = countrygrouped.toPandas()
df_TotalSalesByCountry.head()
Out[ ]:
SalesCountry total_sales
0 Germany 5625
1 France 5558
2 United States 21344
3 Canada 7620
4 Australia 13345
In [ ]:
import plotly.express as px
fig = px.bar(df_TotalSalesByCountry, x='total_sales', y='SalesCountry',text_auto=True)
fig.update_layout(barmode='stack', yaxis={'categoryorder':'total ascending'})
fig.show()

What is RFM?¶

RFM is a method used to analyze customer value. RFM stands for RECENCY, Frequency, and Monetary.

RECENCY: How recently did the customer visit our website or how recently did a customer purchase?

Frequency: How often do they visit or how often do they purchase?

Why is it needed?¶

RFM Analysis is a marketing framework that is used to understand and analyze customer behaviour based on the above three factors RECENCY, Frequency, and Monetary.

The RFM Analysis will help the businesses to segment their customer base into different homogenous groups so that they can engage with each group with different targeted marketing strategies.

Calculating R, F, and M values in Python:¶

From the sales data we have, we calculate RFM values in Python and Analyze the customer behaviour and segment the customers based on RFM values.

We attempt to find churn bheavior of the customer,Grouping Customer into two segment Using RFM:

  • CHURN
  • NOT CHURN

Read the data¶

In [ ]:
SalesDataDF.to_excel('SalesOrder.xlsx',sheet_name='SalesData', index=False)
SalesDataDF.head()
Out[ ]:
SalesOrderNumber ProductName ProductSubcategoryName ProductcategoryName OrderQuantity SalesAmount Margin OrderDate MonthName Quarter WeekNo OrderYear YYQQ CustKey CustomerName Gender BirthDate AGE YearlyIncome IncomeCategory SalesCountry SalesRegion
0 SO43697 Road-150 Red, 62 Road Bikes Bikes 1 3578.27 1406.98 2010-12-29 Dec Q4 52 2010 2010-Q4 21768 Cole Watson M 1946-08-22 77 70000 HIGH INCOME Canada North America
1 SO43698 Mountain-100 Silver, 44 Mountain Bikes Bikes 1 3399.99 1487.84 2010-12-29 Dec Q4 52 2010 2010-Q4 28389 Rachael Martinez F 1964-12-18 59 20000 LOW INCOME France Europe
2 SO43699 Mountain-100 Silver, 44 Mountain Bikes Bikes 1 3399.99 1487.84 2010-12-29 Dec Q4 52 2010 2010-Q4 25863 Sydney Wright F 1946-12-03 77 40000 LOW INCOME United States North America
3 SO43700 Road-650 Black, 62 Road Bikes Bikes 1 699.10 285.95 2010-12-29 Dec Q4 52 2010 2010-Q4 14501 Ruben Prasad M 1938-05-13 85 80000 HIGH INCOME United States North America
4 SO43701 Mountain-100 Silver, 44 Mountain Bikes Bikes 1 3399.99 1487.84 2010-12-29 Dec Q4 52 2010 2010-Q4 11003 Christy Zhu F 1968-02-15 55 70000 HIGH INCOME Australia Pacific
In [ ]:
fig, axarr = plt.subplots(1, 2,figsize = (15,6))
SalesDataDF[['ProductcategoryName','SalesAmount']].groupby("ProductcategoryName").sum().plot(kind="bar",ax=axarr[0])
SalesDataDF[['ProductcategoryName','OrderQuantity']].groupby("ProductcategoryName").sum().plot(kind="bar",ax=axarr[1])
Out[ ]:
<AxesSubplot:xlabel='ProductcategoryName'>

We can see, Bikes account for huge revenue generation even though accessories are being sold in high quantity. This might be because the cost of Bikes will be higher than the cost of Accessories.

In [ ]:
#Similarly, we can check which region has a higher customer base.

fig, axarr = plt.subplots(1, 2,figsize = (15,6))
plt.xticks(rotation=45)
Customer_Country = SalesDataDF.groupby('SalesCountry')['CustKey'].nunique().sort_values(ascending=False).reset_index().head(11)
snsplot1 = sns.barplot(data=Customer_Country,x='SalesCountry',y='CustKey',palette='Blues',ax=axarr[0])
snsplot1.set_xticklabels(snsplot1.get_xticklabels(),rotation=30)

Customer_Region = SalesDataDF.groupby('SalesRegion')['CustKey'].nunique().sort_values(ascending=False).reset_index().head(11)
snsplot2 = sns.barplot(data=Customer_Region,x='SalesRegion',y='CustKey',palette='Blues',ax=axarr[1])
snsplot2.set_xticklabels(snsplot2.get_xticklabels(),rotation=30)
Out[ ]:
[Text(0, 0, 'North America'), Text(1, 0, 'Europe'), Text(2, 0, 'Pacific')]

Calculate R, F, and M values:¶

Recency¶

In [ ]:
df_recency = SalesDataDF
df_recency = df_recency.groupby(by='CustKey',as_index=False)['OrderDate'].max()
df_recency.columns = ['CustomerKey','max_date']
df_recency.head() ## Most Recent Purchase is mapped to each customer ID
Out[ ]:
CustomerKey max_date
0 11000 2013-05-03
1 11001 2013-12-10
2 11002 2013-02-23
3 11003 2013-05-10
4 11004 2013-05-01

Since, we have most recent purchase date. Now, We will calculate days diff from today's Date.

reference date = Maximum Date from the Sales Date

The difference between the reference date and maximum date in the dataframe for each customer(which is the recent visit) is Recency

In [ ]:
reference_date = max(df_recency.max_date)
print("Reference Date is : ",reference_date)
df_recency['Recency'] = df_recency['max_date'].apply(lambda row: (reference_date - row).days)
df_recency.drop('max_date',inplace=True,axis=1)
Reference Date is :  2014-01-28

Frequency:¶

We can get the Frequency of the customer by summing up the number of orders.

In [ ]:
df_frequency = SalesDataDF
df_frequency = df_frequency.groupby(by='CustKey',as_index=False)['SalesOrderNumber'].nunique()
df_frequency.columns = ['CustomerKey','Frequency']
print("Frequency Stats :")
Frequency Stats :
In [ ]:
df_frequency.groupby(by='Frequency',as_index=False).count().rename(columns={'CustomerKey':'Total_Numbers'}).head()
Out[ ]:
Frequency Total_Numbers
0 1 11619
1 2 5454
2 3 1166
3 4 150
4 5 51
In [ ]:
sns.histplot(data=df_frequency[(df_frequency['Frequency'] <=5)], x="Frequency",bins= 30,binwidth=30,kde=True,discrete=True,log_scale=False)
Out[ ]:
<AxesSubplot:xlabel='Frequency', ylabel='Count'>

Inference from Frequency Hist Plot:

We can see the customers who order 2 times are more and then we see who orders 3 times. But there is very less number of customers that orders more than 5 times.

Monetary:¶

Now, it’s time for our last value which is Monetary.

Monetary can be calculated as the sum of the Amount of all orders by each customer.

In [ ]:
df_monetary = SalesDataDF
df_monetary = df_monetary.groupby(by='CustKey',as_index=False)['SalesAmount'].sum()

df_monetary.columns = ['CustomerKey','Monetary']
In [ ]:
plt.figure(figsize=(8,5))
sns.displot(data=df_monetary[(df_monetary['Monetary'] <= 1500)], x="Monetary",bins='auto',kde=True,rug=True)
Out[ ]:
<seaborn.axisgrid.FacetGrid at 0x1c34fff5820>
<Figure size 800x500 with 0 Axes>

It is evident that most of the money that customers spend is under $200. They may be purchasing additional accessories, which could explain this. This happens frequently because we only buy bikes once or twice a year, but we buy accessories more frequently.

We are unable to draw any conclusions based just on recent, frequent, or monetary values. All three elements must be considered.

Let's combine the numbers for recency, frequency, and money to produce a new dataframe.

In [ ]:
r_f = df_recency.merge(df_frequency,on='CustomerKey')
r_f.head()
Out[ ]:
CustomerKey Recency Frequency
0 11000 270 3
1 11001 49 3
2 11002 339 3
3 11003 263 3
4 11004 272 3
In [ ]:
RFM = r_f.merge(df_monetary,on='CustomerKey')
RFM.head()
RFM.to_excel('RFM.xlsx',index = False)
In [ ]:
# RFM_Profile = ProfileReport(RFM[['Recency','Frequency','Monetary']])
# RFM_Profile
In [ ]:
plt.scatter(
    RFM.groupby('CustomerKey')['Recency'].sum(),
    RFM.groupby('CustomerKey')['Frequency'].sum(),
    color = 'red',
    marker = '*', alpha = 0.3
)
plt.title('Scatter Plot for : Recency Vs Frequency') 
plt.xlabel('Recency')
plt.ylabel('Frequency')
Out[ ]:
Text(0, 0.5, 'Frequency')
In [ ]:
plt.scatter(
    RFM.groupby('CustomerKey')['Monetary'].sum(),
    RFM.groupby('CustomerKey')['Frequency'].sum(),
    color = 'red',
    marker = '*', alpha = 0.3
)
plt.title('Scatter Plot for : Monetary Vs frequency') 
plt.xlabel('Monetary')
plt.ylabel('Frequency')
Out[ ]:
Text(0, 0.5, 'Frequency')

Inference from Recency Vs frequency Scatter Plot:

We can see the customers whose Recency is less than a month have high Frequency i.e the customers buying more when their recency is less.

In [ ]:
plt.scatter(
    RFM.groupby('CustomerKey')['Monetary'].sum(),
    RFM.groupby('CustomerKey')['Frequency'].sum(),
    color = 'red',
    marker = '*', alpha = 0.3
)
plt.title('Scatter Plot for : Monetary Vs frequency') 
plt.xlabel('Recency')
plt.ylabel('Frequency')
Out[ ]:
Text(0, 0.5, 'Frequency')

Inference from Monetary Vs frequency Scatter Plot:

We can see, customers buying frequently are spending less amount. This might be because we frequently buy Accessories which are less costly.

In [ ]:
## Bucketing Recency:
def R_Score(x):
    if x['Recency'] <= 30:
        recency = 3
    elif x['Recency'] <= 60:
        recency = 2
    else:
        recency = 1
    return recency

RFM['R'] = RFM.apply(R_Score,axis=1)
In [ ]:
## Bucketing Frequency

def F_Score(x):
    if x['Frequency'] >= 3 :
        Frequency = 3
    elif x['Frequency'] == 2:
        Frequency = 2
    else:
        Frequency = 1
    return Frequency

RFM['F'] = RFM.apply(F_Score,axis=1)
In [ ]:
RFM['M'] = pd.qcut(RFM['Monetary'],q=3,labels=range(1,4))

R-F-M Score¶

Now, let’s find the R-F-M Score for each customer by combining each factor.

In [ ]:
def RFM_Score(x):
    return int(str(int(x['R'])) + str(int(x['F'])) + str(int(x['M'])))
RFM['RFM_SCORE'] = RFM.apply(RFM_Score,axis=1)
In [ ]:
RFM['RFM_Score_AGG'] = np.array(RFM['R']) + np.array(RFM['F']) + np.array(RFM['M'])
In [ ]:
imp_numerical_cols = ['Recency', 'Frequency', 'Monetary']
df = RFM
for i,col in enumerate(imp_numerical_cols):
    text="<center><h3><b  style='color:black'>" + f"Distribution of</b> <b style='color:#800080'> {col}"+ "</b></h3><hr></center>"
    display(HTML(text))
    plt.figure(figsize=(16,5))
    bins='auto'
    plt.subplot(1,3,1)
    sns.histplot(data=df,x=col,stat='density',bins=bins,color='lightblue')
    plt.title(f'Histogram of {col}',size=15)
    
    plt.subplot(1,3,2)
    sns.kdeplot(data=df,x=col,color='red')
    plt.title(f'Kdeplot of {col}',size=15)
    
    plt.subplot(1,3,3)
    sns.boxplot(data=df,x=col,color='lightblue',showmeans=True)
    plt.title(f'Box-plot of {col}',size=15)
    plt.tight_layout()
    plt.show()

Distribution of Recency


Distribution of Frequency


Distribution of Monetary


In [ ]:
RFM['churn'] = RFM[['Recency', 'Frequency', 'Monetary']].apply(
    lambda x: 1 if x['Recency'] > 180 and x['Frequency'] <= 1 and x['Monetary'] <= 1000 else 0, axis=1)
RFM.churn.value_counts()
Out[ ]:
0    13969
1     4515
Name: churn, dtype: int64

Now, we have to identify some key segments.

If the R-F-M score of any customer is 3-3-3. His Recency is good, frequency is more and Monetary is more. So, he is a Big spender.

Similarly, if his score is 2-3-3, then his Recency is better and frequency and monetary are good. This customer hasn’t purchased for some time but he buys frequently and spends more.

we can have something like the below for all different segments

In [ ]:
Image(url= "https://editor.analyticsvidhya.com/uploads/72488Segments.png")
Out[ ]:
In [ ]:
segment = [0]*len(RFM)

for i in range(0,len(RFM)):
    if RFM['RFM_SCORE'][i] <= 111:
        segment[i] = 'Churned'
    else:
        segment[i] = 'Not Churned'
RFM['segment'] = segment
RFM.segment.value_counts()
Out[ ]:
Not Churned    13564
Churned         4920
Name: segment, dtype: int64
In [ ]:
RFM.head()
Out[ ]:
CustomerKey Recency Frequency Monetary R F M RFM_SCORE RFM_Score_AGG churn segment
0 11000 270 3 8248.99 1 3 3 133 7 0 Not Churned
1 11001 49 3 6383.88 2 3 3 233 8 0 Not Churned
2 11002 339 3 8114.04 1 3 3 133 7 0 Not Churned
3 11003 263 3 8139.29 1 3 3 133 7 0 Not Churned
4 11004 272 3 8196.01 1 3 3 133 7 0 Not Churned
In [ ]:
if 'segment' not in SalesDataDF.columns:
    SalesDataDF = SalesDataDF.merge(RFM,left_on = 'CustKey',right_on ='CustomerKey')

TASK2 : CLASSFICATION MODEL

Classfication is a supervised learning method for the problem of identifying categorical label for the input data.¶

In [ ]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
sns.set(color_codes = True)
In [ ]:
SalesDataDF.head()
Out[ ]:
SalesOrderNumber ProductName ProductSubcategoryName ProductcategoryName OrderQuantity SalesAmount Margin OrderDate MonthName Quarter WeekNo OrderYear YYQQ CustKey CustomerName Gender BirthDate AGE YearlyIncome IncomeCategory SalesCountry SalesRegion CustomerKey Recency Frequency Monetary R F M RFM_SCORE RFM_Score_AGG churn segment
0 SO43697 Road-150 Red, 62 Road Bikes Bikes 1 3578.27 1406.98 2010-12-29 Dec Q4 52 2010 2010-Q4 21768 Cole Watson M 1946-08-22 77 70000 HIGH INCOME Canada North America 21768 289 2 4118.26 1 2 3 123 6 0 Not Churned
1 SO56212 Mountain-500 Black, 52 Mountain Bikes Bikes 1 539.99 245.41 2013-04-14 Apr Q2 15 2013 2013-Q2 21768 Cole Watson M 1946-08-22 77 70000 HIGH INCOME Canada North America 21768 289 2 4118.26 1 2 3 123 6 0 Not Churned
2 SO43698 Mountain-100 Silver, 44 Mountain Bikes Bikes 1 3399.99 1487.84 2010-12-29 Dec Q4 52 2010 2010-Q4 28389 Rachael Martinez F 1964-12-18 59 20000 LOW INCOME France Europe 28389 1126 1 3399.99 1 1 3 113 5 0 Not Churned
3 SO43699 Mountain-100 Silver, 44 Mountain Bikes Bikes 1 3399.99 1487.84 2010-12-29 Dec Q4 52 2010 2010-Q4 25863 Sydney Wright F 1946-12-03 77 40000 LOW INCOME United States North America 25863 186 2 4631.11 1 2 3 123 6 0 Not Churned
4 SO62866 Touring-2000 Blue, 50 Touring Bikes Bikes 1 1214.85 459.70 2013-07-26 Jul Q3 30 2013 2013-Q3 25863 Sydney Wright F 1946-12-03 77 40000 LOW INCOME United States North America 25863 186 2 4631.11 1 2 3 123 6 0 Not Churned
In [ ]:
#Label Encoding on the Package attribute
from sklearn.preprocessing import LabelEncoder

# creating instance of labelencoder
labelencoder = LabelEncoder()

#Assigning numerical values and storing in another column
SalesDataDF['Gender_class'] = labelencoder.fit_transform(SalesDataDF['Gender'])
SalesDataDF['churn'] = SalesDataDF.segment.apply(lambda x : 0 if x == 'Not Churned' else 1)

if 'Revenue' not in SalesDataDF.columns: 
    DF_cust_revenue  = SalesDataDF.groupby(by='CustKey',as_index=False)['SalesAmount'].sum().rename(columns={'SalesAmount':'Revenue'})
    SalesDataDF = SalesDataDF.merge(DF_cust_revenue,on='CustKey')
if 'MaxOrderDate' not in SalesDataDF.columns:
    DF_cust_maxdate = SalesDataDF.groupby(by='CustKey',as_index=False)['OrderDate'].max().rename(columns={'OrderDate':'MaxOrderDate'})
    SalesDataDF = SalesDataDF.merge(DF_cust_maxdate,on='CustKey')
In [ ]:
dimCust = DimCustomer.toPandas()
dimCust.CustomerKey = dimCust.CustomerKey.astype(int)
In [ ]:
SalesDataDF['rank_cust_by_OrderDate'] = SalesDataDF.groupby('CustKey')['OrderDate'].rank(method='first')
SalesDataDFUniqueOnly  = SalesDataDF[(SalesDataDF['rank_cust_by_OrderDate'] == 1)]
In [ ]:
if 'churn' not in dimCust.columns:
    dimCust = dimCust.merge(SalesDataDFUniqueOnly[['CustomerKey','AGE','churn','Gender_class','Revenue','MaxOrderDate',
         'R','F','M','Recency','Frequency','SalesCountry','SalesRegion']] ,
        on = 'CustomerKey', how = 'left')
print("Shape of Cust Dataframe : ",dimCust.shape)
dimCust.head()
Shape of Cust Dataframe :  (18484, 41)
Out[ ]:
CustomerKey GeographyKey CustomerAlternateKey Title FirstName MiddleName LastName NameStyle BirthDate MaritalStatus Suffix Gender EmailAddress YearlyIncome TotalChildren NumberChildrenAtHome EnglishEducation SpanishEducation FrenchEducation EnglishOccupation SpanishOccupation FrenchOccupation HouseOwnerFlag NumberCarsOwned AddressLine1 AddressLine2 Phone DateFirstPurchase CommuteDistance AGE churn Gender_class Revenue MaxOrderDate R F M Recency Frequency SalesCountry SalesRegion
0 11000 26 AW00011000 None Jon V Yang False 1966-04-08 M None M jon24@adventure-works.com 90000 2 0 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 0 3761 N. 14th St None 1 (11) 500 555-0162 2005-07-22 1-2 Miles 57 0 1 8248.99 2013-05-03 1 3 3 270 3 Australia Pacific
1 11001 37 AW00011001 None Eugene L Huang False 1965-05-14 S None M eugene10@adventure-works.com 60000 3 3 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 0 1 2243 W St. None 1 (11) 500 555-0110 2005-07-18 0-1 Miles 58 0 1 6383.88 2013-12-10 2 3 3 49 3 Australia Pacific
2 11002 31 AW00011002 None Ruben None Torres False 1965-08-12 M None M ruben35@adventure-works.com 60000 3 3 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 1 5844 Linden Land None 1 (11) 500 555-0184 2005-07-10 2-5 Miles 58 0 1 8114.04 2013-02-23 1 3 3 339 3 Australia Pacific
3 11003 11 AW00011003 None Christy None Zhu False 1968-02-15 S None F christy12@adventure-works.com 70000 0 0 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 0 1 1825 Village Pl. None 1 (11) 500 555-0162 2005-07-01 5-10 Miles 55 0 0 8139.29 2013-05-10 1 3 3 263 3 Australia Pacific
4 11004 19 AW00011004 None Elizabeth None Johnson False 1968-08-08 S None F elizabeth5@adventure-works.com 80000 5 5 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 4 7553 Harness Circle None 1 (11) 500 555-0131 2005-07-26 1-2 Miles 55 0 0 8196.01 2013-05-01 1 3 3 272 3 Australia Pacific
In [ ]:
dimCust["DateFirstPurchase"] = pd.to_datetime(dimCust["DateFirstPurchase"]).dt.date
dimCust["MaxOrderDate"] = pd.to_datetime(dimCust["MaxOrderDate"]).dt.date
dimCust['Custlife'] = (dimCust["MaxOrderDate"] - dimCust["DateFirstPurchase"]).dt.days
In [ ]:
dimCust.head(n = 10)
# dimCust.to_excel('cust.xlsx',index= False)
Out[ ]:
CustomerKey GeographyKey CustomerAlternateKey Title FirstName MiddleName LastName NameStyle BirthDate MaritalStatus Suffix Gender EmailAddress YearlyIncome TotalChildren NumberChildrenAtHome EnglishEducation SpanishEducation FrenchEducation EnglishOccupation SpanishOccupation FrenchOccupation HouseOwnerFlag NumberCarsOwned AddressLine1 AddressLine2 Phone DateFirstPurchase CommuteDistance AGE churn Gender_class Revenue MaxOrderDate R F M Recency Frequency SalesCountry SalesRegion Custlife
0 11000 26 AW00011000 None Jon V Yang False 1966-04-08 M None M jon24@adventure-works.com 90000 2 0 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 0 3761 N. 14th St None 1 (11) 500 555-0162 2005-07-22 1-2 Miles 57 0 1 8248.99 2013-05-03 1 3 3 270 3 Australia Pacific 2842
1 11001 37 AW00011001 None Eugene L Huang False 1965-05-14 S None M eugene10@adventure-works.com 60000 3 3 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 0 1 2243 W St. None 1 (11) 500 555-0110 2005-07-18 0-1 Miles 58 0 1 6383.88 2013-12-10 2 3 3 49 3 Australia Pacific 3067
2 11002 31 AW00011002 None Ruben None Torres False 1965-08-12 M None M ruben35@adventure-works.com 60000 3 3 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 1 5844 Linden Land None 1 (11) 500 555-0184 2005-07-10 2-5 Miles 58 0 1 8114.04 2013-02-23 1 3 3 339 3 Australia Pacific 2785
3 11003 11 AW00011003 None Christy None Zhu False 1968-02-15 S None F christy12@adventure-works.com 70000 0 0 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 0 1 1825 Village Pl. None 1 (11) 500 555-0162 2005-07-01 5-10 Miles 55 0 0 8139.29 2013-05-10 1 3 3 263 3 Australia Pacific 2870
4 11004 19 AW00011004 None Elizabeth None Johnson False 1968-08-08 S None F elizabeth5@adventure-works.com 80000 5 5 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 4 7553 Harness Circle None 1 (11) 500 555-0131 2005-07-26 1-2 Miles 55 0 0 8196.01 2013-05-01 1 3 3 272 3 Australia Pacific 2836
5 11005 22 AW00011005 None Julio None Ruiz False 1965-08-05 S None M julio1@adventure-works.com 70000 0 0 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 1 7305 Humphrey Drive None 1 (11) 500 555-0151 2005-07-02 5-10 Miles 58 0 1 8121.33 2013-05-02 1 3 3 271 3 Australia Pacific 2861
6 11006 8 AW00011006 None Janet G Alvarez False 1965-12-06 S None F janet9@adventure-works.com 70000 0 0 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 1 2612 Berry Dr None 1 (11) 500 555-0184 2005-07-27 5-10 Miles 58 0 0 8119.03 2013-05-14 1 3 3 259 3 Australia Pacific 2848
7 11007 40 AW00011007 None Marco None Mehta False 1964-05-09 M None M marco14@adventure-works.com 60000 3 3 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 2 942 Brook Street None 1 (11) 500 555-0126 2005-07-12 0-1 Miles 59 0 1 8211.00 2013-03-19 1 3 3 315 3 Australia Pacific 2807
8 11008 32 AW00011008 None Rob None Verhoff False 1964-07-07 S None F rob4@adventure-works.com 60000 4 4 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 1 3 624 Peabody Road None 1 (11) 500 555-0164 2005-07-28 10+ Miles 59 0 0 8106.31 2013-03-02 1 3 3 332 3 Australia Pacific 2774
9 11009 25 AW00011009 None Shannon C Carlson False 1964-04-01 S None M shannon38@adventure-works.com 70000 0 0 Bachelors Licenciatura Bac + 4 Professional Profesional Cadre 0 1 3839 Northgate Road None 1 (11) 500 555-0110 2005-07-30 5-10 Miles 59 0 1 8091.33 2013-05-09 1 3 3 264 3 Australia Pacific 2840
In [ ]:
#Assigning numerical values and storing in another column
dimCust['SalesRegion_class'] = labelencoder.fit_transform(dimCust['SalesRegion'])
dimCust['EnglishEducation_class'] = labelencoder.fit_transform(dimCust['EnglishEducation'])
dimCust['EnglishOccupation_class'] = labelencoder.fit_transform(dimCust['EnglishOccupation'])
In [ ]:
dimCust.YearlyIncome = dimCust.YearlyIncome.astype(int)
dimCust.TotalChildren = dimCust.TotalChildren.astype(int)
dimCust.NumberChildrenAtHome = dimCust.NumberChildrenAtHome.astype(int)
dimCust.HouseOwnerFlag = dimCust.HouseOwnerFlag.astype(int)
dimCust.NumberCarsOwned = dimCust.NumberCarsOwned.astype(int)



#Using Pearson Correlation ( association )
plt.figure(figsize=(16,14))
cor = dimCust[['AGE','YearlyIncome','F','Revenue','Custlife','churn','SalesRegion_class','Recency','TotalChildren','R']].corr()
sns.heatmap(cor, annot=True, cmap=plt.cm.Reds)
plt.show()

create feature vector¶

In [ ]:
cor[['churn']].sort_values(by='churn',ascending=False).style.background_gradient(cmap='viridis', axis=None)
Out[ ]:
  churn
churn 1.000000
Recency 0.092123
AGE 0.074776
TotalChildren 0.070059
YearlyIncome -0.073233
SalesRegion_class -0.119821
R -0.247334
Custlife -0.365873
F -0.426398
Revenue -0.440422
In [ ]:
spDFCust = spark.createDataFrame(dimCust)
if 'features' not in spDFCust.columns:
    assembler = VectorAssembler(
        inputCols=['AGE', 'YearlyIncome', 'Gender_class','Revenue','NumberCarsOwned','HouseOwnerFlag','TotalChildren'],
        outputCol='features')
    spDFCust = assembler.transform(spDFCust)
spDFCustchurn = spDFCust.select(['features', 'churn'])
spDFCustchurn.show(10,truncate=False)
+-------------------------------------------------+-----+
|features                                         |churn|
+-------------------------------------------------+-----+
|[57.0,90000.0,1.0,8248.99,0.0,1.0,2.0]           |0    |
|[58.0,60000.0,1.0,6383.879999999999,1.0,0.0,3.0] |0    |
|[58.0,60000.0,1.0,8114.04,1.0,1.0,3.0]           |0    |
|[55.0,70000.0,0.0,8139.29,1.0,0.0,0.0]           |0    |
|[55.0,80000.0,0.0,8196.01,4.0,1.0,5.0]           |0    |
|[58.0,70000.0,1.0,8121.33,1.0,1.0,0.0]           |0    |
|[58.0,70000.0,0.0,8119.03,1.0,1.0,0.0]           |0    |
|[59.0,60000.0,1.0,8211.0,2.0,1.0,3.0]            |0    |
|[59.0,60000.0,0.0,8106.3099999999995,3.0,1.0,4.0]|0    |
|[59.0,70000.0,1.0,8091.33,1.0,0.0,0.0]           |0    |
+-------------------------------------------------+-----+
only showing top 10 rows

Split into training and testing sets¶

In [ ]:
train, test = spDFCustchurn.randomSplit([0.9, 0.1], seed=42)

Initialise classifier :¶

  • fit model

  • Making Predictions

In [ ]:
LGR = LogisticRegression(featuresCol='features', labelCol='churn')
LGR_model = LGR.fit(train)

### Predict and calculate accuracy

#making predictions
predictions = LGR_model.transform(test)

## Lable is "churn" columns which is also the target variable.

accuracy = predictions.filter(predictions.churn == predictions.prediction).count() / float(predictions.count())
print("Accuracy %: ",round(accuracy,2)*100)
Accuracy %:  90.0

Calculate evaluation metrics¶

In [ ]:
# Calculate evaluation metrics
label = "churn"
tp = predictions.filter((predictions.churn == 1) & (predictions.prediction == 1)).count()
tn = predictions.filter((predictions.churn == 0) & (predictions.prediction == 0)).count()
fp = predictions.filter((predictions.churn == 0) & (predictions.prediction == 1)).count()
fn = predictions.filter((predictions.churn == 1) & (predictions.prediction == 0)).count()

print("TRUE POSTIVE : Predicted value is postive and it's postive  : ",tp)
print("TRUE NEGETIVE : Predicted value is negetive and it's Negetive : ",tn)
print("FALSE POSTIVE : FP : Predicted value is postive and it's Negetive : ",fp)
print("FALSE NEGETIVE : FN : Predicted value is NEGETIVE and it's postive : ",fn)

accuracy = (tp + tn) / (tp + tn + fp + fn)

try:
    precision = tp / (tp + fp)
except ZeroDivisionError:
    precision = 0
    
try:
    recall = tp / (tp + fn)
except ZeroDivisionError:
    recall = tp / (tp + fn)

if precision != 0 or recall != 0:
    f1_score = 2 * (precision * recall) / (precision + recall)
else:
    f1_score = 0

# Calculate AUC ROC
evaluator = BinaryClassificationEvaluator(labelCol=label, rawPredictionCol="prediction")
roc_auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
TRUE POSTIVE : Predicted value is postive and it's postive  :  404
TRUE NEGETIVE : Predicted value is negetive and it's Negetive :  1236
FALSE POSTIVE : FP : Predicted value is postive and it's Negetive :  107
FALSE NEGETIVE : FN : Predicted value is NEGETIVE and it's postive :  77
In [ ]:
print("Accuracy :", round(accuracy,2))
print("Precision :", round(precision,2))
print("Recall :", round(recall,2))
print("F1 Score :", round(f1_score,2))
print("AUC ROC :", round(roc_auc,2))
Accuracy : 0.9
Precision : 0.79
Recall : 0.84
F1 Score : 0.81
AUC ROC : 0.88
In [ ]:
Image(url= "https://miro.medium.com/max/828/1*3yGLac6F4mTENnj5dBNvNQ.webp")
Out[ ]:
In [ ]:
# confusion matrix 
group_names = ['True Neg','False Neg','False Pos','True Pos']
test_cnf_matrix = np.array([
    [tn, fn],
    [fp, tp]
])
test_counts = ["{0:0.0f}".format(value) for value in test_cnf_matrix.flatten()]
test_percentage = ["{0:.2%}".format(value) for value in test_cnf_matrix .flatten()/np.sum(test_cnf_matrix)]
test_labels = [f"{v1}\n{v2}\n{v3}" for v1, v2, v3 in zip(group_names,test_counts,test_percentage)]
test_labels = np.asarray(test_labels).reshape(2,2)
plt.figure(figsize = (16,5))
sns.heatmap(test_cnf_matrix, annot = test_labels, cbar = False,fmt='', cmap='Blues');
plt.ylabel("Prediction")
plt.xlabel("Actual")
plt.title("Confusion Matrix")
plt.rcParams.update({'font.size': 50})
plt.show()

Inference About AUC : Area Under Curve¶

AUC-ROC is another metric we use for evaluating the performance among the models. AUC-ROC is used to determine how well the model can distinguish the different labels/classes, and takes both the true positive rate (TPR) and false positive rate (FPR) into account

TASK3 : REGRESSION

Regression is a supervised learning method for the problem of identifying numeric value for the input data.¶

Use Case : We aim to implement Linear Regression to effciently predict the total amount spent(Revenue) by a customer

In [ ]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
In [ ]:
cor[['Revenue']].sort_values(by='Revenue',ascending=False).style.background_gradient(cmap='viridis', axis=None)
Out[ ]:
  Revenue
Revenue 1.000000
Custlife 0.784837
F 0.648306
YearlyIncome 0.161680
SalesRegion_class 0.116298
Recency 0.073955
AGE -0.049277
TotalChildren -0.073806
R -0.105588
churn -0.440422
In [ ]:
spDFCust2 = spark.createDataFrame(dimCust)
if 'features' not in spDFCust2.columns:
    assembler = VectorAssembler(
        inputCols=['YearlyIncome','F','Custlife','SalesRegion_class','Recency','TotalChildren','R'],
        outputCol='features')
    spDFCust2 = assembler.transform(spDFCust2)
spDF_revenue = spDFCust2.select(['features', 'Revenue'])
spDF_revenue.show(10,truncate=False)
+--------------------------------------+------------------+
|features                              |Revenue           |
+--------------------------------------+------------------+
|[90000.0,3.0,2842.0,2.0,270.0,2.0,1.0]|8248.99           |
|[60000.0,3.0,3067.0,2.0,49.0,3.0,2.0] |6383.879999999999 |
|[60000.0,3.0,2785.0,2.0,339.0,3.0,1.0]|8114.04           |
|[70000.0,3.0,2870.0,2.0,263.0,0.0,1.0]|8139.29           |
|[80000.0,3.0,2836.0,2.0,272.0,5.0,1.0]|8196.01           |
|[70000.0,3.0,2861.0,2.0,271.0,0.0,1.0]|8121.33           |
|[70000.0,3.0,2848.0,2.0,259.0,0.0,1.0]|8119.03           |
|[60000.0,3.0,2807.0,2.0,315.0,3.0,1.0]|8211.0            |
|[60000.0,3.0,2774.0,2.0,332.0,4.0,1.0]|8106.3099999999995|
|[70000.0,3.0,2840.0,2.0,264.0,0.0,1.0]|8091.33           |
+--------------------------------------+------------------+
only showing top 10 rows

In [ ]:
from scipy import stats
from scipy.stats import norm, skew 
# Let's Explore how SalePrice is distributed against normal theoretical quantiles
fig = plt.figure()
ax = fig.add_subplot()
res = stats.probplot(dimCust['Revenue'], plot=plt)
In [ ]:
train, test = spDF_revenue.randomSplit([0.7, 0.3], seed=42)
In [ ]:
LR = LinearRegression(featuresCol = 'features', labelCol='Revenue', maxIter=10, regParam=0.3, elasticNetParam=0.8)
LR_model = LR.fit(train)
print("Coefficients: " + str(LR_model.coefficients))
print("Intercept: " + str(LR_model.intercept))
Coefficients: [0.007290847809763265,633.0704071052016,5.595618502784839,-222.946566278741,2.9742027239176037,-31.068755794926304,-157.36611210121225]
Intercept: -11970.875985788198
In [ ]:
trainingSummary = LR_model.summary
print("RMSE: %.02f" % trainingSummary.rootMeanSquaredError)
print("r2: %0.2f" % trainingSummary.r2)
RMSE: 1209.27
r2: 0.69
In [ ]:
predictions = LR_model.transform(test)
predictions.select("prediction","Revenue","features").show(5)

lr_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="Revenue",metricName="r2")
print("Prediction of R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predictions))
+------------------+-------+--------------------+
|        prediction|Revenue|            features|
+------------------+-------+--------------------+
|1.4662994572408934|  34.99|[10000.0,1.0,2007...|
| 25.58066686100028|  33.98|[10000.0,1.0,2007...|
|328.94934470059525|  13.98|[10000.0,1.0,2007...|
| 472.0318210610567|  34.99|[10000.0,1.0,2007...|
| 653.1374416076142|  28.99|[10000.0,1.0,2007...|
+------------------+-------+--------------------+
only showing top 5 rows

Prediction of R Squared (R2) on test data = 0.67309

Residual = actual value — predicted value¶

In [ ]:
label = "Revenue"
plt.title("Actual Vs Predicted Revenue")
sns.scatterplot(x="prediction", y=label, data=predictions.toPandas())
Out[ ]:
<AxesSubplot:title={'center':'Actual Vs Predicted Revenue'}, xlabel='prediction', ylabel='Revenue'>
In [ ]:
x_ax = range(0, predictions.count())
y_pred = predictions.select("prediction").collect()
y_orig = predictions.select("Revenue").collect()
plt.plot(x_ax, y_orig, label="original")
plt.plot(x_ax, y_pred, label="predicted")
plt.title("Customer Revenue and predicted Revenue")
plt.xlabel('X-axis')
plt.ylabel('Y-axis')
plt.legend(loc='best',fancybox=True, shadow=True)
plt.grid(True)
plt.show()  
In [ ]:
predictions_pd = predictions.toPandas()

# Use seaborn to plot the predicted values against the actual values

sns.regplot(x="Revenue", y="prediction", data=predictions_pd)
Out[ ]:
<AxesSubplot:xlabel='Revenue', ylabel='prediction'>
In [ ]: